{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 7 个Spark编程练习题"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "spark = org.apache.spark.sql.SparkSession@74b39ddf\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "org.apache.spark.sql.SparkSession@74b39ddf"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import org.apache.spark.sql.SparkSession\n",
    "\n",
    "val spark = SparkSession\n",
    ".builder()\n",
    ".appName(\"Spark SQL basic example\")\n",
    ".enableHiveSupport()\n",
    ".getOrCreate()\n",
    "\n",
    "//开启隐式转换\n",
    "import spark.implicits._"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 一，求平均值"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "data = List(17, 83, 87, 99, 97, 33, 9, 41, 52)\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "List(17, 83, 87, 99, 97, 33, 9, 41, 52)"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//任务：求data的平均值\n",
    "import util.Random\n",
    "val data = for(i<- List.range(1,10)) yield Random.nextInt(100)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "57.55555555555556\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "rdd = ParallelCollectionRDD[0] at parallelize at <console>:37\n",
       "mean = 57.55555555555556\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "57.55555555555556"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//使用RDD编程实现\n",
    "val rdd = sc.parallelize(data,5)\n",
    "val mean = rdd.map(_.toDouble).reduce(_+_)/rdd.count\n",
    "println(mean)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------------+\n",
      "|       avg(value)|\n",
      "+-----------------+\n",
      "|57.55555555555556|\n",
      "+-----------------+\n",
      "\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "df = [value: int]\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "[value: int]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//使用SparkSQL编程实现\n",
    "val df = data.toDF(\"value\")\n",
    "df.agg(\"value\"->\"avg\").show"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 二， WordCount统计词频"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "file = wordcount\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "wordcount"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//任务：统计file中每个词的词频\n",
    "val file = \"wordcount\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "rdd = wordcount MapPartitionsRDD[11] at textFile at <console>:39\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "Array((BeiJing,2), (hello,5), (XiCheng,1), (world,1), (China,2), (TianAnMen,1))"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//使用RDD编程实现\n",
    "val rdd = sc.textFile(file)\n",
    "rdd.flatMap(_.trim.split(\" \")).map((_,1)).reduceByKey(_+_).collect"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+-----+\n",
      "|    value|count|\n",
      "+---------+-----+\n",
      "|  BeiJing|    2|\n",
      "|    hello|    5|\n",
      "|    China|    2|\n",
      "|    world|    1|\n",
      "|  XiCheng|    1|\n",
      "|TianAnMen|    1|\n",
      "+---------+-----+\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "df = [value: string]\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "lastException: Throwable = null\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "[value: string]"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//使用SparkSQL编程实现\n",
    "val df = spark.read.option(\"header\",\"false\").csv(file).toDF(\"value\")\n",
    "df.flatMap(row=>row(0).toString.trim.split(\" \")).groupBy(\"value\").count.show"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 三，求TopN "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "students = List((LiLei,18,87), (HanMeiMei,16,77), (DaChui,16,66), (Jim,18,80), (RuHua,20,50))\n",
       "n = 3\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "3"
      ]
     },
     "execution_count": 35,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//任务：\n",
    "//有一批学生信息表格，包括name,age,score\n",
    "//找出score排名前3的学生\n",
    "val students = List((\"LiLei\",18,87),\n",
    "                   (\"HanMeiMei\",16,77),\n",
    "                   (\"DaChui\",16,66),\n",
    "                   (\"Jim\",18,80),\n",
    "                   (\"RuHua\",20,50))\n",
    "val n = 3"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "rdd = ParallelCollectionRDD[193] at parallelize at <console>:45\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "Array((LiLei,18,87), (Jim,18,80), (HanMeiMei,16,77))"
      ]
     },
     "execution_count": 36,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//使用RDD编程实现\n",
    "val rdd = sc.parallelize(students)\n",
    "rdd.sortBy(_._3,ascending = false).take(n)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+---+-----+\n",
      "|     name|age|score|\n",
      "+---------+---+-----+\n",
      "|    LiLei| 18|   87|\n",
      "|      Jim| 18|   80|\n",
      "|HanMeiMei| 16|   77|\n",
      "+---------+---+-----+\n",
      "\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "df = [name: string, age: int ... 1 more field]\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "[name: string, age: int ... 1 more field]"
      ]
     },
     "execution_count": 38,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//使用SparkSQL编程实现\n",
    "val df = students.toDF(\"name\",\"age\",\"score\")\n",
    "df.orderBy(df(\"score\").desc).limit(n).show"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 四，求最大值最小值"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "data = List(1, 7, 8, 5, 3, 18, 34, 23, 67, 53, 9, 0, 12, 8)\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "List(1, 7, 8, 5, 3, 18, 34, 23, 67, 53, 9, 0, 12, 8)"
      ]
     },
     "execution_count": 15,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//任务：求最大值最小值\n",
    "val data = List(1,7,8,5,3,18,34,23,67,53,9,0,12,8)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "max_value:67\n",
      "min_value:0\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "rdd = ParallelCollectionRDD[67] at parallelize at <console>:39\n",
       "max_value = 67\n",
       "min_value = 0\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "0"
      ]
     },
     "execution_count": 16,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//使用RDD编程实现，方案1\n",
    "val rdd = sc.parallelize(data,3)\n",
    "val max_value = rdd.reduce((a,b)=> if(a>b) a else b)\n",
    "val min_value = rdd.reduce((a,b)=> if(a>b) b else a)\n",
    "println(\"max_value:\" + max_value)\n",
    "println(\"min_value:\" + min_value)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "rdd = ParallelCollectionRDD[68] at parallelize at <console>:39\n",
       "temp = MapPartitionsRDD[69] at mapPartitions at <console>:40\n",
       "result = (0,67)\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "(0,67)"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//使用RDD编程实现，方案2\n",
    "val rdd = sc.parallelize(data,3)\n",
    "val temp = rdd.mapPartitions(iterator => {\n",
    "    var min = Integer.MAX_VALUE\n",
    "    var max = Integer.MIN_VALUE\n",
    "    for(x <- iterator){\n",
    "        if(x>max) max = x\n",
    "        if(x<min) min = x\n",
    "    }\n",
    "    Iterator((min,max))\n",
    "})\n",
    "val result = temp.reduce((a,b)=>\n",
    "          {val min = if(a._1<= b._1) a._1 else b._1\n",
    "           val max = if(a._2 >= b._2) a._2 else b._2\n",
    "           (min,max)\n",
    "          })\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+---------+\n",
      "|max_value|min_value|\n",
      "+---------+---------+\n",
      "|       67|        0|\n",
      "+---------+---------+\n",
      "\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "df = [value: int]\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "[value: int]"
      ]
     },
     "execution_count": 18,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//使用SparkSQL编程实现\n",
    "import org.apache.spark.sql.functions._\n",
    "val df = data.toDF(\"value\")\n",
    "df.agg(max(\"value\") as \"max_value\",min(\"value\") as \"min_value\").show"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 五，排序并返回序号"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "data = List(1, 7, 8, 5, 3, 18, 34, 9, 0, 12, 8)\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "List(1, 7, 8, 5, 3, 18, 34, 9, 0, 12, 8)"
      ]
     },
     "execution_count": 19,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//任务：排序并返回序号\n",
    "val data = List(1,7,8,5,3,18,34,9,0,12,8)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "rdd = ParallelCollectionRDD[78] at parallelize at <console>:42\n",
       "len = 11\n",
       "sortedrdd = MapPartitionsRDD[87] at repartition at <console>:44\n",
       "index = ParallelCollectionRDD[88] at parallelize at <console>:45\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "Array((0,0), (1,1), (2,3), (3,5), (4,7), (5,8), (6,8), (7,9), (8,12), (9,18), (10,34))"
      ]
     },
     "execution_count": 20,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//使用RDD编程实现：方案1\n",
    "val rdd = sc.parallelize(data,3)\n",
    "val len = rdd.count\n",
    "val sortedrdd = rdd.map((_,1)).sortByKey().map(_._1).repartition(1)\n",
    "val index = sc.parallelize(0 to len.toInt-1,1)\n",
    "index.zip(sortedrdd).collect"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "rdd = ParallelCollectionRDD[90] at parallelize at <console>:43\n",
       "sortedrdd = MapPartitionsRDD[99] at repartition at <console>:44\n",
       "idx = -1\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "Array((0,0), (1,1), (2,3), (3,5), (4,7), (5,8), (6,8), (7,9), (8,12), (9,18), (10,34))"
      ]
     },
     "execution_count": 21,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//使用RDD编程实现：方案2\n",
    "val rdd = sc.parallelize(data,3)\n",
    "val sortedrdd = rdd.map((_,1)).sortByKey().map(_._1).repartition(1)\n",
    "var idx = -1\n",
    "sortedrdd.map(value => {\n",
    "    idx+=1\n",
    "    (idx,value)\n",
    "}).collect"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "(0,0)(1,1)(2,3)(3,5)(4,7)(5,8)(6,8)(7,9)(8,12)(9,18)(10,34)"
     ]
    },
    {
     "data": {
      "text/plain": [
       "rdd = ParallelCollectionRDD[117] at parallelize at <console>:43\n",
       "result = Array((0,0), (1,1), (2,3), (3,5), (4,7), (5,8), (6,8), (7,9), (8,12), (9,18), (10,34))\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "Array((0,0), (1,1), (2,3), (3,5), (4,7), (5,8), (6,8), (7,9), (8,12), (9,18), (10,34))"
      ]
     },
     "execution_count": 24,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//使用RDD编程实现：方案3\n",
    "val rdd = sc.parallelize(data,3)\n",
    "//利用zipWithIndex方法\n",
    "val result = rdd.map((_,1)).sortByKey()\n",
    "    .map(_._1).zipWithIndex().map(x =>(x._2,x._1)).collect\n",
    "result.foreach(print)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+-----+\n",
      "|value|index|\n",
      "+-----+-----+\n",
      "|    0|    0|\n",
      "|    1|    1|\n",
      "|    3|    2|\n",
      "|    5|    3|\n",
      "|    7|    4|\n",
      "|    8|    5|\n",
      "|    8|    6|\n",
      "|    9|    7|\n",
      "|   12|    8|\n",
      "|   18|    9|\n",
      "|   34|   10|\n",
      "+-----+-----+\n",
      "\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "df = [value: int]\n",
       "w = org.apache.spark.sql.expressions.WindowSpec@4a5eda7b\n",
       "result = [value: int, index: int]\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "[value: int, index: int]"
      ]
     },
     "execution_count": 25,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//使用SparkSQL编程实现：方案1\n",
    "import org.apache.spark.sql.expressions.Window \n",
    "import org.apache.spark.sql.functions.row_number \n",
    "\n",
    "val df = data.toDF(\"value\").sort(\"value\")\n",
    "val w = Window.orderBy(\"value\") \n",
    "\n",
    "val result = df.withColumn(\"index\", row_number().over(w)-1)\n",
    "result.show"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+-----+\n",
      "|idx|value|\n",
      "+---+-----+\n",
      "|  0|    0|\n",
      "|  1|    1|\n",
      "|  2|    3|\n",
      "|  3|    5|\n",
      "|  4|    7|\n",
      "|  5|    8|\n",
      "|  6|    8|\n",
      "|  7|    9|\n",
      "|  8|   12|\n",
      "|  9|   18|\n",
      "| 10|   34|\n",
      "+---+-----+\n",
      "\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "df = [value: int]\n",
       "rdd = MapPartitionsRDD[152] at map at <console>:46\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "MapPartitionsRDD[152] at map at <console>:46"
      ]
     },
     "execution_count": 26,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//使用SparkSQL编程实现：方案2\n",
    "\n",
    "val df = data.toDF(\"value\").sort(\"value\")\n",
    "val rdd = df.rdd.map(_(0)).zipWithIndex().map(x=>(x._2,x._1.toString.toInt))\n",
    "rdd.toDF(\"idx\",\"value\").show"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+-----+\n",
      "|idx|value|\n",
      "+---+-----+\n",
      "|  0|    0|\n",
      "|  1|    1|\n",
      "|  2|    3|\n",
      "|  3|    5|\n",
      "|  4|    7|\n",
      "|  5|    8|\n",
      "|  6|    8|\n",
      "|  7|    9|\n",
      "|  8|   12|\n",
      "|  9|   18|\n",
      "| 10|   34|\n",
      "+---+-----+\n",
      "\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "df = [value: int]\n",
       "idx = 10\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "index: ()Int\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "10"
      ]
     },
     "execution_count": 27,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//使用SparkSQL编程实现：方案3\n",
    "\n",
    "val df = data.toDF(\"value\").sort(\"value\")\n",
    "var idx = -1\n",
    "def index():Int ={\n",
    "    idx = idx+1\n",
    "    idx\n",
    "}\n",
    "spark.udf.register(\"index\",(x:Any) => index())\n",
    "df.selectExpr(\"index(value) as idx\",\"value\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "###  六，二次排序"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "students = List((LiLei,16,87), (HanMeiMei,17,87), (DaChui,16,77), (RuHua,18,50))\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "List((LiLei,16,87), (HanMeiMei,17,87), (DaChui,16,77), (RuHua,18,50))"
      ]
     },
     "execution_count": 28,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//任务：\n",
    "//首先根据学生的score从大到小排序，\n",
    "//如果score相同，根据age从大到小排序\n",
    "//数据表结构：name,age,score\n",
    "val students = List((\"LiLei\",16,87),\n",
    "                    (\"HanMeiMei\",17,87),\n",
    "                   (\"DaChui\",16,77),\n",
    "                   (\"RuHua\",18,50))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Student(HanMeiMei,17,87)\n",
      "Student(LiLei,16,87)\n",
      "Student(DaChui,16,77)\n",
      "Student(RuHua,18,50)\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "defined class Student\n",
       "rdd = MapPartitionsRDD[162] at map at <console>:32\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "MapPartitionsRDD[162] at map at <console>:32"
      ]
     },
     "execution_count": 29,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//使用RDD编程实现\n",
    "case class Student(name:String,age:Int,score:Int)\n",
    "    extends Ordered[Student] with Serializable{\n",
    "        override def compare(other:Student):Int = {\n",
    "        if(this.score - other.score!=0) {\n",
    "            this.score - other.score\n",
    "        }\n",
    "        else {\n",
    "            this.age - other.age\n",
    "        }\n",
    "        }\n",
    "    }\n",
    "\n",
    "val rdd = sc.parallelize(students).map(s=>Student(s._1,s._2,s._3))\n",
    "rdd.map((_,1)).sortByKey(ascending = false).map(_._1).collect.foreach(println)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+---+-----+\n",
      "|     name|age|score|\n",
      "+---------+---+-----+\n",
      "|HanMeiMei| 17|   87|\n",
      "|    LiLei| 16|   87|\n",
      "|   DaChui| 16|   77|\n",
      "|    RuHua| 18|   50|\n",
      "+---------+---+-----+\n",
      "\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "df = [name: string, age: int ... 1 more field]\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "[name: string, age: int ... 1 more field]"
      ]
     },
     "execution_count": 30,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//使用SparkSQL编程实现\n",
    "val df= students.toDF(\"name\",\"age\",\"score\")\n",
    "df.sort($\"score\".desc,$\"age\".desc).show"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 七，连接操作"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "classes = List((class1,LiLei), (class1,HanMeiMei), (class2,DaChui), (class2,RuHua))\n",
       "scores = List((LiLei,76), (HanMeiMei,80), (DaChui,70), (RuHua,60))\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "List((LiLei,76), (HanMeiMei,80), (DaChui,70), (RuHua,60))"
      ]
     },
     "execution_count": 32,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//任务：\n",
    "//已知班级信息表和成绩表，找出班级平均分在75分以上的班级\n",
    "//班级信息表包括cls,name,成绩表包括name,score\n",
    "\n",
    "val classes = List((\"class1\",\"LiLei\"),\n",
    "              (\"class1\",\"HanMeiMei\"),\n",
    "              (\"class2\",\"DaChui\"),\n",
    "              (\"class2\",\"RuHua\"))\n",
    "val scores = List((\"LiLei\",76),\n",
    "             (\"HanMeiMei\",80),\n",
    "             (\"DaChui\",70),\n",
    "             (\"RuHua\",60))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "classesRDD = MapPartitionsRDD[173] at map at <console>:45\n",
       "scoresRDD = ParallelCollectionRDD[174] at parallelize at <console>:46\n",
       "joinedRDD = MapPartitionsRDD[178] at map at <console>:48\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "mean: (l: List[Int])Double\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "Array((class1,78.0))"
      ]
     },
     "execution_count": 33,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//RDD编程实现\n",
    "val classesRDD = sc.parallelize(classes).map(s=>(s._2,s._1))\n",
    "val scoresRDD = sc.parallelize(scores)\n",
    "\n",
    "val joinedRDD = scoresRDD.join(classesRDD).map(s=>(s._2._2,s._2._1))\n",
    "\n",
    "def mean(l:List[Int]):Double ={\n",
    "    val doublel = for(x<-l)yield x.toDouble\n",
    "    doublel.sum/(l.length)\n",
    "}\n",
    "\n",
    "joinedRDD.groupByKey()\n",
    "         .map(x=>(x._1,mean(x._2.toList)))\n",
    "         .filter(_._2>75)\n",
    "         .collect"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------+----------+\n",
      "|   cls|avg(score)|\n",
      "+------+----------+\n",
      "|class1|      78.0|\n",
      "+------+----------+\n",
      "\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "dfcls = [cls: string, name: string]\n",
       "dfscore = [name: string, score: int]\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "[name: string, score: int]"
      ]
     },
     "execution_count": 34,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "//SparkSQL编程实现\n",
    "val dfcls = classes.toDF(\"cls\",\"name\")\n",
    "val dfscore = scores.toDF(\"name\",\"score\")\n",
    "\n",
    "dfcls.join(dfscore,\"name\")\n",
    "     .groupBy(\"cls\")\n",
    "     .agg(\"score\"->\"avg\")\n",
    "     .where(\"avg(score)>75.0\")\n",
    "     .show"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Apache Toree - Scala",
   "language": "scala",
   "name": "apache_toree_scala"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".scala",
   "mimetype": "text/x-scala",
   "name": "scala",
   "pygments_lexer": "scala",
   "version": "2.11.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
